package com.example.event;

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;

/**
 * 事件总线接口的实现类
 */
@SuppressWarnings({"rawtypes"})
public class DisruptorEventBusImpl implements EventBus,
        EventBusImplMBean {

    private static final Logger logger = LoggerFactory
            .getLogger(DisruptorEventBusImpl.class);

    /** 注册的事件接收者 */
    private ConcurrentHashMap<String, CopyOnWriteArraySet<Receiver<?>>> receivers = new ConcurrentHashMap<>();

    private Integer queueSize = 4096;
    private Integer poolSize = 2;
    private Integer poolAwaitTime = 60;

    private ExecutorService pool;
    private RingBuffer<Event> ringBuffer;
    private Disruptor<Event> disruptor;

    public final static EventFactory<Event> FACTORY = new EventFactory<Event>() {
        @Override
        public Event newInstance() {
            return new Event<>();
        }
    };

    public static final EventTranslatorOneArg<Event, Event> TRANSLATOR = new EventTranslatorOneArg<Event, Event>() {

        @Override
        public void translateTo(Event event, long sequence, Event arg0) {
            event.setName(arg0.getName());
            event.setBody(arg0.getBody());
        }
    };

    private class RingeventWork implements WorkHandler<Event> {
        @Override
        public void onEvent(Event event) throws Exception {
            DisruptorEventBusImpl.this.onEvent(event);
        }
    }

    void onEvent(Event event) {
        String name = event.getName();
        if (!receivers.containsKey(name)) {
            logger.warn("事件[{}]没有对应的接收器", name);
            return;
        }
        for (Receiver<?> receiver : receivers.get(name)) {
            try {
                receiver.onEvent(event);
            } catch (ClassCastException e) {
                logger.error("事件[" + event.getName() + "]对象类型不符合接收器声明", e);
            } catch (Throwable t) {
                logger.error("事件[" + event.getName() + "]处理时发生异常", t);
            }
        }
        event.setBody(null);
        event.setName(null);
    }

    /**
     * 根据配置初始化
     */
    @PostConstruct
    public void initialize() {
        ThreadGroup threadGroup = new ThreadGroup("事件模块");
        NamedThreadFactory threadFactory = new NamedThreadFactory(threadGroup, "事件处理线程-");
        pool = Executors.newCachedThreadPool(threadFactory);
        disruptor = new Disruptor<>(FACTORY, queueSize, pool, ProducerType.MULTI, new SleepingWaitStrategy());
        RingeventWork[] works = new RingeventWork[poolSize];
        for (int i = 0; i < poolSize; i++) {
            works[i] = new RingeventWork();
        }
        disruptor.handleEventsWithWorkerPool(works);
        ringBuffer = disruptor.start();
        // 注册MBean， Spring Boot 已默认注册(spring.jmx.enabled=true)，此处不需要了
//        try {
//            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
//            ObjectName name = new ObjectName("com.example.event:type=DisruptorEventBusImpl");
//            mbs.registerMBean(this, name);
//        } catch (Exception e) {
//            logger.error("注册[common-event]的JMX管理接口失败", e);
//        }
    }

    /** 销毁方法 */
    @PreDestroy
    public void destory() {
        shutdown();
    }

    /** 停止状态 */
    private volatile boolean stop;

    /**
     * 关闭事件总线，阻塞方法会等待总线中的全部事件都发送完后再返回
     */
    public void shutdown() {
        if (isStop())
            return;
        stop = true;
        for (; ; ) {
            if (ringBuffer.remainingCapacity() == ringBuffer.getBufferSize()) {
                break;
            }
            Thread.yield();
        }
        // 等待线程池关闭
        disruptor.shutdown();
        pool.shutdown();
        logger.info("开始关闭事件总线线程池");
        try {
            if (!pool.awaitTermination(poolAwaitTime, TimeUnit.SECONDS)) {
                logger.error("无法在预计时间内完成事件总线线程池关闭,尝试强行关闭");
                pool.shutdownNow();
                if (!pool.awaitTermination(poolAwaitTime, TimeUnit.SECONDS)) {
                    logger.error("事件总线线程池无法完成关闭");
                }
            }
        } catch (InterruptedException e) {
            logger.error("事件总线线程池关闭时线程被打断,强制关闭事件总线线程池");
            pool.shutdownNow();
        }
    }

    /**
     * 检查该事件总线是否已经停止服务
     *
     * @return
     */
    public boolean isStop() {
        return stop;
    }

    @Override
    public List<Future<?>> post(Event event) {
        if (event == null) {
            throw new IllegalArgumentException("事件对象不能为空");
        }
        if (stop) {
            throw new IllegalStateException("事件总线已经停止，不能再接收事件");
        }
        ringBuffer.publishEvent(TRANSLATOR, event);
        return Collections.EMPTY_LIST;
    }

    @Override
    public void register(String name, Receiver receiver) {
        if (name == null || receiver == null) {
            throw new IllegalArgumentException("事件名和接收者均不能为空");
        }
        CopyOnWriteArraySet<Receiver<?>> set = receivers.get(name);
        if (set == null) {
            set = new CopyOnWriteArraySet<>();
            CopyOnWriteArraySet<Receiver<?>> prev = receivers.putIfAbsent(name,
                    set);
            set = prev != null ? prev : set;
        }

        set.add(receiver);
    }

    @Override
    public void unregister(String name, Receiver receiver) {
        if (Objects.isNull(name) || Objects.isNull(receiver)) {
            throw new IllegalArgumentException("事件名和接收者均不能为空");
        }
        CopyOnWriteArraySet<Receiver<?>> set = receivers.get(name);
        if (set != null) {
            set.remove(receiver);
        }
    }

    @Override
    public void syncPost(Event event) {
        String name = event.getName();
        if (!receivers.containsKey(name)) {
            logger.warn("事件'{}'没有对应的接收器", name);
            return;
        }
        for (Receiver receiver : receivers.get(name)) {
            try {
                receiver.onEvent(event);
            } catch (Exception e) {
                logger.error("事件[" + event.getName() + "]处理时发生异常", e);
            }
        }
    }

    // JMX管理接口的实现方法

    @Override
    public int getEventQueueSize() {
//		return (int) (ringBuffer.getBufferSize() - ringBuffer
//				.remainingCapacity());
        return ((ThreadPoolExecutor) pool).getQueue().size();
    }

    @Override
    public int getPoolActiveCount() {
        return ((ThreadPoolExecutor) pool).getActiveCount();
    }

    public Integer getQueueSize() {
        return queueSize;
    }

    public void setQueueSize(Integer queueSize) {
        this.queueSize = queueSize;
    }

    public Integer getPoolSize() {
        return poolSize;
    }

    public void setPoolSize(Integer poolSize) {
        this.poolSize = poolSize;
    }

    public Integer getPoolAwaitTime() {
        return poolAwaitTime;
    }

    public void setPoolAwaitTime(Integer poolAwaitTime) {
        this.poolAwaitTime = poolAwaitTime;
    }
}